package com.wss.lsl.test.driven.concurrent;

import java.math.BigInteger;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;

import org.junit.Assert;

/**
 * 采用线程的终端机制
 * 
 * @author Administrator
 * 
 */
public class PrimeProducer extends Thread {

	private BlockingQueue<BigInteger> queue;
	private CountDownLatch begin;
	private CountDownLatch end;

	public PrimeProducer(CountDownLatch begin, BlockingQueue<BigInteger> queue,
			CountDownLatch end) {
		super();
		this.begin = begin;
		this.queue = queue;
		this.end = end;
	}

	@Override
	public void run() {

		try {
			BigInteger p = BigInteger.ONE;
			while (!Thread.currentThread().isInterrupted()) {
				begin.countDown();
				System.out.println("put");
				queue.put(p.nextProbablePrime());
			}
		} catch (InterruptedException e) {
			Thread.currentThread().interrupt();
			/**
			 * 允许线程退出
			 */
		} finally {
			end.countDown();
		}
	}

	public synchronized void cancel() {
		interrupt();
	}

	public static void main(String[] args) throws InterruptedException {
		BlockingQueue<BigInteger> queue = new ArrayBlockingQueue<BigInteger>(2);
		queue.put(new BigInteger("1"));
		queue.put(new BigInteger("2"));

		CountDownLatch begin = new CountDownLatch(1);
		CountDownLatch end = new CountDownLatch(1);

		PrimeProducer producer = new PrimeProducer(begin, queue, end);
		producer.start();

		begin.await();
		producer.cancel();
		System.out.println("任务已经取消");

		// 阻塞在这里，等待子线程结束
		end.await();
		Assert.assertEquals(Thread.State.TERMINATED, producer.getState());
		System.out.println("Done!");
	}

}
